Exploring Heating Problems in Manhattan

To learn how to query data with SparkSQL, in this notebook, you will download and investigate New York City's 311 Complaints data, which is available as part of NYC Open Data.

Of particular interest is Heating Complaints within the Manhattan Borough. In this notebook, you will be exploring the data, using Spark's SQL module and the visualization tool Brunel.

During the months of October to May, residents of NYC can call 311 to report that a building doesn't have enough heat or hot water. In the remaining months, June to September, complaints can be made that heating has been left on.

There may be a number of different factors that contribute to heating complaints, you will select a few of the features available in the data to see if they suggest any correlation.


Load the Necessary Packages

First import the OS module which allows you to interface with the underlying operating system that Python is running on. Also import matplotlib which is a collection of command style functions that make matplotlib work like MATLAB. Pandas is the main library in Python for data analysis. And lastly, import and install the Brunel package which will be used to visualize the data.


In [ ]:
import os
import matplotlib.pyplot as plt
import pandas as pd
os.environ['BRUNEL_CONFIG'] = "locjavascript=/data/jupyter2/static-file-content-delivery-network/nbextensions/brunel_ext"
import brunel
# Brunel will be used to visualize data later on

In [ ]:
!pip install --user brunel

Read the Data

The data was previously saved to an Object Store; so the following code is an example of the appropriate credentials to be able to access and read the data. Use the Insert the Code function to insert your credentials. This statement explicitly tells Spark that the data is in CSV format, has a header, and that the document schema should be inferred. This will be saved as a DataFrame named nyc311DF


In [ ]:
import ibmos2spark

credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'api_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'service_id': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token'}

configuration_name = 'os_xxxxxxxxxxxxxxxxxxxxxxxxxx_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
nyc311DF = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('IAE_examples_data_311NYC.csv', 'trafficanalysisxxxxxxxxxxxxxxxxxxxxxxx'))
nyc311DF.take(5)

This next statement displays a count of the number of rows in the data set.


In [ ]:
nyc311DF.count()

Now let's display the schema to see what columns exist in order to select a few to investigate.


In [ ]:
nyc311DF.printSchema()

Spark SQL Exploration

SparkSQL is a powerful tool allowing users a (often) familiar and (relatively) intuitive way to explore the data. In order to refer to the data within an SQL query, it needs to be stored as a view. The statement below creates a temporary view named nyc311ct.


In [ ]:
nyc311DF.createOrReplaceTempView("nyc311ct")

In [ ]:
spark.sql("select distinct Borough from nyc311ct").show()

Let's find the complaint type with the most complaints in Manhattan. Note that this statement calls the cache function which means that when the next action is called ("show", "count", and so on), it will store the dataframe nyc311Agr_df in memory for much quicker retrieval in the future. However, this must be small enough to fit.


In [ ]:
nyc311Agr_df = spark.sql("select `Complaint Type` as Complaint_Type, count(`Unique Key`) as Complaint_Count "
                            "from nyc311ct where Borough = 'MANHATTAN' "
                            "group by `Complaint Type` order by Complaint_Count desc").cache()

In [ ]:
nyc311Agr_df.show(4)

Let's get a visual representation of the data within nyc311Agr_df. This next cell contains statements to create a custom pandas data frame which will be used to create a bubble chart in the following cell, where the size of the bubble represents the number of complaints. The complaint type is assigned a color, and, if large enough, the bubble is labeled, else the type displayed when hovered over.


In [ ]:
custom_frame = nyc311Agr_df.toPandas()
custom_frame.head(4)

In [ ]:
%brunel data('custom_frame') bubble size(Complaint_Count) color(Complaint_Type) label(Complaint_Type) legends(none) tooltip(Complaint_Type)

How does the number of complaints vary by Zip code? Let's remove any data points where a zip code hasn't been provided and filter to those that are of type 'HEAT/HOT WATER'. Note: If just exploring the data, where you do not intend to re-use the resulting dataframe, you can just use Spark SQL with the function "show" without assigning it to a variable.


In [ ]:
spark.sql("select `Incident Zip` as Zip, count(*) as ZipHeatingCnt " 
          "from nyc311ct " 
          "where `Complaint Type` = 'HEAT/HOT WATER' and `Incident Zip` <> '' group by `Incident Zip`").show()

Similarly, if you wish to use the result of these queries for future queries but do not require the data as a dataframe, you can create a table directly from the query as seen in the next cell.


In [ ]:
spark.sql("select `Incident Zip` as Zip, count(*) as ZipHeatingCnt "  
          "from nyc311ct " 
          "where `Complaint Type` = 'HEAT/HOT WATER' and `Incident Zip` <> '' group by `Incident Zip`").createOrReplaceTempView("zipHeatingCnt")

Let's see which date and zip codes had the most complaints. The 'Created Date' field includes a time, therefore use the "split" function to just use the date, and also limit the data to only heat/hot water complaints, and for the year of 2016.


In [ ]:
spark.sql("select split(`Created Date`, ' ')[0] as Incident_Date, `Incident Zip` as Incident_Zip, "
          "count(`Unique Key`) as HeatingComplaintCount "
          "from nyc311ct where `Complaint Type` = 'HEAT/HOT WATER' and `Incident Zip` <> '' "
          "and split(split(`Created Date`, ' ')[0], '/')[2] = '16' "
          "group by split(`Created Date`, ' ')[0], `Incident Zip` order by HeatingComplaintCount desc limit 50").show()

This concludes the tutorial on how to query using SparkSQL.


In [ ]: